Amazon SageMaker Processingを試してみた #reinvent
どうも、DA事業本部の大澤です。
昨日発表されたAmazon SageMaker Processingを使ってみたので、その様子をご紹介します。
やってみた
今回は公式ブログで紹介されていた内容をもとに試してみました。
Irisデータセットを読み込み、分割し、出力するというシンプルな内容です。
準備
前処理実行時にSageMakerが使用するIAMロールのARNや処理後データの保存場所を決めておきます。
import sagemaker from os import path from sklearn import datasets from sagemaker.sklearn.processing import SKLearnProcessor from sagemaker.processing import ProcessingInput, ProcessingOutput import pandas as pd bucket = 'osawa-test-ml' key_prefix = 'sagemaker-processing-test' prefix = f's3://{bucket}/{key_prefix}' destination = { 'train': path.join(prefix, 'train'), 'validation': path.join(prefix, 'validation'), 'test': path.join(prefix, 'test') } role = 'role_arn' # role = sagemaker.get_execution_role()
データ準備
Irisのデータを読み込んで、CSVとして保存します。
iris = datasets.load_iris() pd.DataFrame(iris.data, columns=iris.feature_names).to_csv('dataset.csv', index=False)
前処理用スクリプト
データを読み込み、ただデータを分割し、保存するだけの処理です。
ファイルの入出力場所については、inputs
とoutputs
として後ほどProcessorのrun
時に設定するパスと合わせます。
import pandas as pd import os from sklearn.model_selection import train_test_split # ローカルにデータを読み込む df = pd.read_csv('/opt/ml/processing/input/dataset.csv') # データ分割 train, test = train_test_split(df, test_size=0.2) train, validation = train_test_split(train, test_size=0.2) # データの出力先ディレクトリを作成 os.makedirs('/opt/ml/processing/output/train', exist_ok=True) os.makedirs('/opt/ml/processing/output/validation', exist_ok=True) os.makedirs('/opt/ml/processing/output/test', exist_ok=True) # データをローカルに保存 train.to_csv("/opt/ml/processing/output/train/train.csv") train.to_csv("/opt/ml/processing/output/train/train.csv", index=False) validation.to_csv("/opt/ml/processing/output/validation/validation.csv", index=False) test.to_csv("/opt/ml/processing/output/test/test.csv", index=False) print('Finished running processing job')
前処理ジョブの作成
SKLearnProcessor
を使って、環境情報を設定します。
設定可能な引数は次の通りです。
- framework_version (str): 使用するscikit-learnのバージョン
- role (str): SageMakerが使用するIAMロールのARN
- instance_type (str): インスタンスタイプ
- instance_count (int): インスタンス数
- command ([str]): 実行時に使用するコマンド 例: ["python3", "-v"]
- volume_size_in_gb (int): 処理実行時に使用するEBSのサイズ
- volume_kms_key (str): 処理実行時に使用するストレージボリュームを暗号化するKMSキーのARN
- output_kms_key (str): ProcessingOutputsを保存する際に使用するKMSキーのARN
- max_runtime_in_seconds (int): タイムアウト秒数
- base_job_name (str): 処理ジョブ
- sagemaker_session (sagemaker.session.Session): SageMakerのセッション
- env (dict): ジョブ実行時に設定する環境変数
- tags ([dict]): 処理ジョブに指定するタグ
- network_config (sagemaker.network.NetworkConfig): ネットワーク情報
sklearn_processor = SKLearnProcessor(framework_version='0.20.0', role=role, instance_count=1, instance_type='ml.m5.xlarge')
run
によって、code
に指定したスクリプトファイルがSageMaker上で実行されます。
指定可能な引数は次の通りです。
- code (str): 実行するスクリプトファイルのパス。ローカル or S3
- inputs ([sagemaker.processing.ProcessingInput]): 入力ファイル情報
- outputs ([str or sagemaker.processing.ProcessingOutput]): 出力ファイル情報
- arguments ([str]): 実行時の引数
- wait (bool): ジョブの完了を待つかどうか
- logs (bool): ジョブ実行時のログを出力するかどうか(wait=Trueの時に有効)
- job_name (str):ジョブ名
- experiment_config (dict[str, str]): 実験管理情報。キーは'ExperimentName', 'TrialName','TrialComponentDisplayName'
sklearn_processor.run( code='preprocessing.py', inputs=[ProcessingInput( source='dataset.csv', destination='/opt/ml/processing/input')], outputs=[ProcessingOutput(source='/opt/ml/processing/output/train', destination=destination['train']), ProcessingOutput(source='/opt/ml/processing/output/validation', destination=destination['validation']), ProcessingOutput(source='/opt/ml/processing/output/test', destination=destination['test'])] )
5分弱ほどでジョブが完了しました
ジョブの確認
ジョブ情報はjobs
を参照することで確認できます。
sklearn_processor.jobs[0].describe().keys()
次のような情報が格納されています。
S3からデータを取ってきて、データが出力されているか確認します。
for output in sklearn_processor.jobs[0].outputs: dest = output.destination basename = path.basename(dest) !aws s3 cp {dest} ./{basename} --recursive
無事出力されているようです。
さいごに
Amazon SageMaker Processingを使って処理してみた内容をお伝えしました。思っていたより簡単に実行でき、便利そうです。これからどんどん活用していきたいと思います。